import json
import time
import random
from datetime import datetime
from faker import Faker
from kafka import KafkaProducer

# 初始化Faker和Kafka生产者
fake = Faker('zh_CN')
producer = KafkaProducer(
    bootstrap_servers=['node101:9092','node102:9092','node103:9092'],  # Kafka服务器地址
    value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8')
)

# 模拟车辆类型和事件类型
vehicle_types = ['小型汽车', '公交车', '货车', '出租车', '摩托车']
event_types = ['normal', 'overspeed', 'redlight', 'illegal_lane_change']
directions = ['东', '西', '南', '北']

def generate_traffic_event():
    """生成一条模拟交通事件数据"""
    return {
        "eventId": fake.uuid4(),
        "cameraId": f"CAM-{random.randint(1, 100):03d}",  # 模拟100个摄像头
        "licensePlate": fake.license_plate(),
        "vehicleType": random.choice(vehicle_types),
        "roadId": f"ROAD-{random.randint(1, 50):03d}",  # 模拟50条道路
        "crossingId": f"CROSS-{random.randint(1, 20):03d}",  # 模拟20个路口
        "ts": int(datetime.now().timestamp() * 1000),
        "speed": round(random.uniform(10, 120), 1),  # 车速范围10-120 km/h
        "direction": random.choice(directions),
        "lane": random.randint(1, 4),
        "eventType": random.choices(
            event_types,
            weights=[0.7, 0.1, 0.1, 0.1]  # 正常事件占70%，其他违法事件各10%
        )[0],
        "areaId": f"AREA-{random.randint(1, 5):03d}"  # 模拟5个区域
    }

# 持续发送数据到Kafka
topic = 'traffic_events'
while True:
    event = generate_traffic_event()
    producer.send(topic, value=event)
    print(f"Sent: {event}")
    time.sleep(0.5)  # 每0.5秒发送一条，模拟实时数据流